package com.godenwater.recv.server.sl651;

import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.filterchain.IoFilterAdapter;
import org.apache.mina.core.service.IoHandler;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import com.godenwater.recv.RecvConstant;

import cn.gov.mwr.sl651.HydroBuilder;
import cn.gov.mwr.sl651.IMessage;
import cn.gov.mwr.sl651.IMessageBody;
import cn.gov.mwr.sl651.IMessageHeader;
import cn.gov.mwr.sl651.Symbol;
import cn.gov.mwr.sl651.command.DownCommand;
import cn.gov.mwr.sl651.header.HexHeader;
import cn.gov.mwr.sl651.utils.ByteUtil;
import cn.gov.mwr.sl651.utils.StcdParser;
import cn.gov.mwr.sl651.utils.CRC16Helper;

public class ServerDataHandler extends IoFilterAdapter implements IoHandler {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    private final HydroServer server;

    private String channel;
    private int port;
    /**
     * 针对多条消息接收完以后的合并处理，如图片消息，大数据消息
     */
    private ConcurrentHashMap M3 = new ConcurrentHashMap();

    public ServerDataHandler(HydroServer server, String channel, int port) {
        this.server = server;
        this.channel = channel;
        this.port = port;
    }

    /**
     * 消息接收后的处理，只处理业务逻辑，此消息处理是在解码后，所以只需要关注消息体的内容
     */
    public void messageReceived(IoSession session, Object message)
            throws Exception {

        StringBuffer log = new StringBuffer();
        log.append("----------------------------------------------------------------------\n");
        log.append("接收通道" + this.channel + " 服务端已收到 " + server.messageSize() + " 消息数;\n");


        if (message instanceof IoBuffer) {
            System.out.println(">>> HydroServerHandler message is IoBuffer ");
        }

        // 纯粹只处理报文，此消息体已经被解码为单体报文
        IMessage msg = (IMessage) message;

        // 3、应答回复或重发报文请求
        IMessage replyMsg = null;
        IMessageHeader header = msg.getHeader();
        byte[] funcCode = header.getFuncCode();
        byte mode = header.getBodyStartBit()[0];
        byte[] stationAddr = header.getStationAddr();
        String stcd = StcdParser.parseStcd(stationAddr);// 此处有问题，若是多个测站共用一个测站编码，会出现召测不到信息的情况。

        // 将测站与session关联起来
        HydroServer.getInstance().getSessionManager().bindSession(session, RecvConstant.SL651, stcd);

        // 1、模式选择，如果是2F表示为M1模式，不需要应答
        if (funcCode[0] != (byte) 0x2F) {// M1模式，无需应答

            // 1、 校验消息
            boolean crcFlag = checkCRC(msg);// 校验消息体，CRC校验

            // 2、 存放于消息队列中，等待处理。将队列放在此处，减少了对链接报的处理
            //ChannelMessage cm = new ChannelMessage();
            //cm.setChannel(channel);
            //cm.setMessage(msg);
            //cm.setCrcFlag(crcFlag);
            //server.queuePush(cm);
            //HydroServer.getInstance().getMessageManager().append(RecvConstant.SL651,msg);

            // M2模式
            if (mode == Symbol.STX && this.channel.equalsIgnoreCase("TCP")) {
                if (crcFlag) {
                    // 确认应答
                    log.append("校验结果，报文CRC验证<相一致>，M2模式发送\"应答\"回复....\n");

                    replyMsg = replyMessage(msg);
                } else {
                    // 重发请求
                    log.append("校验结果，报文CRC验证\"不一致 \"，M2模式发送\"重发\"请求....\n");

                    replyMsg = repeatMessage(msg);
                }
            }

            // M3模式,最后一条报文才进行应答，然后后再考虑重发
            if (mode == Symbol.SYN && this.channel.equalsIgnoreCase("TCP")) {
                if (crcFlag) {
                    // 最后一个再确认应答，应答后再检查是否有错误包，有错误包，再发送重发请求
                    log.append("校验结果，报文CRC验证相一致，M3模式发送应答回复....\n");
//					logger.info(log);
//					HydroServer.getInstance().getMonitorManager().append(log);

                    // 判断是否为最后一条报文，如果是，才进行回复
                    replyMsg = replyMessage(msg);
                    // 重发错误报文请求
                    // repeatMessage();
                } else {
                    // 有错误包，需进行记录
                    log.append("校验结果，报文CRC验证不一致，M3模式发送重发请求....\n");
//					logger.info(log);
//					HydroServer.getInstance().getMonitorManager().append(log);

                    server.saveErrMessage(msg);
                }
            }
        }

        if (replyMsg != null && (this.channel.equalsIgnoreCase("TCP") || this.channel.equalsIgnoreCase("UDP"))) {
            log.append("[回复报文] " + HydroBuilder.toHexString(replyMsg, Symbol.DOWN) + "\n");
            session.write(replyMsg);
        }

        logger.info(log.toString());
        //HydroServer.getInstance().getMonitorManager().append(log.toString());
        // 启动重发机制

    }

    /**
     * 构造HEX回复报文
     *
     * @param msg
     * @return
     */
    public IMessage replyMessage(IMessage msg) {
        try {
            IMessageHeader header = msg.getHeader();
            IMessageBody body = msg.getBody();

            // 取出序列值
            byte[] content = body.getContent();
            byte[] serialid;
            int seqid;
            if (header instanceof HexHeader) {
                serialid = new byte[2];
                System.arraycopy(content, 0, serialid, 0, 2);
                seqid = ByteUtil.bytesToUshort(serialid);
            } else {
                serialid = new byte[4];
                System.arraycopy(content, 0, serialid, 0, 4);
                String hexstr = new String(serialid);
                seqid = ByteUtil.bytesToUshort(ByteUtil
                        .HexStringToBinary(hexstr));
            }

            // 报文回复
            byte[] funcCode = header.getFuncCode();
            byte mode = header.getBodyStartBit()[0];

            DownCommand cmd = new DownCommand();
            cmd.setStartBit(header.getStartBit()[0]);
            cmd.setCenterAddr(header.getCenterAddr());
            cmd.setStationAddr(header.getStationAddr());
            cmd.setPassword(header.getPassword());
            cmd.setBodyStartBit(Symbol.STX);
            cmd.setFuncCode(funcCode);

            byte funccode = (byte) 0x2F;
            if (funcCode.length == 1) {
                funccode = funcCode[0];
            }
            if (funcCode.length == 2) {
                String hexFuncCode = new String(funcCode);
                funccode = ByteUtil.HexStringToBinary(hexFuncCode)[0];
            }

            // 1、模式选择，如果是2F，不需要应答
            if (funccode != (byte) 0x2F) {// M1模式，无需应答

                // M2模式
                if (mode == Symbol.STX) {

                    // 2、判断结束符是哪种格式
                    // 2.1 ：在报文分包传输时作为结束符，表示未完成，不可退出通信
                    if (msg.getEOF() == Symbol.ETB) {
                        // ACK 肯定确认，继续发送，作为有后续报文帧的“确认”结束符。
                        // NAK 否定应答，反馈重发， 用于要求对方重发某数据包的报文结束符。
                        // ENQ 作为下行查询及控制命令帧的报文结束符。

                        cmd.setEof(Symbol.ACK);

                    }

                    // 2.2：作为报文结束符，表示传输完成等待退出通信，
                    if (msg.getEOF() == Symbol.ETX) {
                        // --- 反馈用EOT 作为传输结束确认帧报文符，表示可以退出通信。
                        // --- ESC 要求终端在线。保持在线10分钟内若没有接收到中心站命令，终端退回原先设定的工作状态
                        cmd.setEof(Symbol.ESC);

                    }

                    return cmd.sendReplyMessage(seqid, null);

                } else {
                    // M3模式
                    if (msg.getEOF() == Symbol.ETB) {
                        // ETB不做应答
                        return null;
                    }

                    // 表示到最后一条报文
                    if (msg.getEOF() == Symbol.ETX) {
                        cmd.setEof(Symbol.EOT);
                    }
                    return cmd.sendReplyMessage(seqid, null);

                }

            } else {
                // M1模式，不做回答
                return null;
            }

        } catch (Exception e) {
            e.printStackTrace();
            logger.info(">> 构造回复报文出现异常！" + e.getMessage());
            return null;
        }
    }

    /**
     * 构造重发报文????有问题未完成
     *
     * @param msg
     * @return
     */
    public IMessage repeatMessage(IMessage msg) {

        try {
            IMessageHeader header = msg.getHeader();
            IMessageBody body = msg.getBody();

            byte[] content = body.getContent();
            byte[] serialid = new byte[2];
            if (content.length > 2) {
                System.arraycopy(content, 0, serialid, 0, 2);
            }
            // 报文回复
            byte[] funcCode = header.getFuncCode();
            byte mode = header.getBodyStartBit()[0];

            // 1、模式选择，如果是2F，不需要应答
            if (funcCode[0] != (byte) 0x2F) {// M1模式，无需应答

                if (mode == Symbol.STX) {
                    // M2模式，直接发送重发请求
                    DownCommand cmd = new DownCommand();
                    cmd.setCenterAddr(header.getCenterAddr());
                    cmd.setStationAddr(header.getStationAddr());
                    cmd.setPassword(header.getPassword());

                    cmd.setStartBit(header.getStartBit()[0]);
                    cmd.setBodyStartBit(Symbol.STX);
                    cmd.setFuncCode(funcCode);

                    cmd.setEof(Symbol.NAK);

                    return cmd.sendRepeatMessage(
                            ByteUtil.bytesToUshort(serialid), null);
                } else {
                    // M3模式
                    if (msg.getEOF() == Symbol.ETB) {
                        // 将错误报文，存储以将
                        server.saveErrMessage(msg);
                        return null;
                    }

                    if (msg.getEOF() == Symbol.ETX) {
                        // 将错误报文取出，并构建新报文，进行重发请求

                        return null;
                    }
                    return null;
                }
            } else {
                return null;
            }
        } catch (Exception e) {
            e.printStackTrace();
            logger.info(">> 构造重发报文出现异常！" + e.getMessage());
            return null;
        }
    }

    /**
     * CRC校验
     *
     * @param message
     * @return
     */
    public boolean checkCRC(IMessage message) {

        IMessageHeader header = message.getHeader();
        IMessageBody body = message.getBody();

//		logger.info("校验报文， 开始校验CRC...  ");

        byte[] bytes = new byte[header.getLength() + body.getLength() + 1];

        int pos = 0;
        System.arraycopy(header.getStartBit(), 0, bytes, pos,
                header.getStartBit().length);
        pos = pos + header.getStartBit().length; // 此处将取值直接改为取数据中字节的长度，对字节处理更精确

        System.arraycopy(header.getCenterAddr(), 0, bytes, pos,
                header.getCenterAddr().length);
        pos = pos + header.getCenterAddr().length;

        System.arraycopy(header.getStationAddr(), 0, bytes, pos,
                header.getStationAddr().length);
        pos = pos + header.getStationAddr().length;

        System.arraycopy(header.getPassword(), 0, bytes, pos,
                header.getPassword().length);
        pos = pos + header.getPassword().length;

        System.arraycopy(header.getFuncCode(), 0, bytes, pos,
                header.getFuncCode().length);
        pos = pos + header.getFuncCode().length;

        System.arraycopy(header.getBodySize(), 0, bytes, pos,
                header.getBodySize().length);
        pos = pos + header.getBodySize().length;

        System.arraycopy(header.getBodyStartBit(), 0, bytes, pos,
                header.getBodyStartBit().length);
        pos = pos + header.getBodyStartBit().length;

        if (header.getBodyStartBit()[0] == Symbol.SYN) {

            System.arraycopy(header.getBodyCount(), 0, bytes, pos,
                    header.getBodyCount().length);
            pos = pos + header.getBodyCount().length;
        }

        System.arraycopy(body.getContent(), 0, bytes, pos,
                body.getContent().length);
        pos = pos + body.getContent().length;

        // System.arraycopy(message.getEOF(), 0, bytes, pos, 1);
        bytes[pos] = message.getEOF();

        byte[] crcResult = CRC16Helper.crc16Check(bytes);

        // System.out.println(">> crcResult " +
        // ByteUtil.toHexString(crcResult));

        if (header.getStartBit()[0] == Symbol.SOH_ASC) {

            // 需要重新拼接为原字符串

            // 转换为字符串比较
            String calStr = ByteUtil.toHexString(crcResult);

            String crcStr = new String(message.getCRC());
            // System.out.println(">> crcStr " + crcStr);

            if (calStr.equalsIgnoreCase(crcStr)) {
                return true;
            } else {

                return false;
            }

        } else {
            if (Arrays.equals(crcResult, message.getCRC())) {

                return true;
            } else {
                return false;
            }
        }

    }

    public void viewInfo(IMessage message) {

        System.out.println("******* 中心站查询遥测站实时数据 *******");
        System.out.println("原始包长度：25字节");
        System.out
                .println("原始包信息：7E7E0012345678FFFFFF378008020000130311173540057E13");
        System.out.println("中心站地址：255");
        System.out.println("水文特征码：00，水文测站编码：12345678");
        System.out.println("密码：FFFF");
        System.out.println("功能码：37");
        System.out.println("流水号：0，发报时间：2013-03-11 17:35:40");

    }

    /**
     * 发送应答式消息
     */
    public void sendAck(IoSession session) {
        // session.write(message);
    }

    /**
     * {@inheritDoc}
     */
    public void sessionClosed(IoSession session) throws Exception {
        //logger.info("关闭会话  " + session.getRemoteAddress());
        HydroServer.getInstance().getSessionManager().removeSession(session);
    }

    /**
     * 当session创建连接后，需添加到一个客户端中
     */
    public void sessionCreated(IoSession session) throws Exception {
//		logger.info("创建会话  " + session.getRemoteAddress());
        String log = "创建会话  " + session.getRemoteAddress();
        HydroServer.getInstance().queueLogsPush(log);
        HydroServer.getInstance().getSessionManager().createSession(session);
    }

    /**
     * {@inheritDoc} 在此对空闲的session进行关闭
     */
    public void sessionIdle(IoSession session, IdleStatus status)
            throws Exception {
        // logger.info("空闲会话  " + session.getRemoteAddress());
        HydroServer.getInstance().getSessionManager().idleSession(session);
    }

    /**
     * {@inheritDoc}
     */
    public void sessionOpened(IoSession session) throws Exception {
        // logger.info("Session Opened...");

    }

    public void exceptionCaught(IoSession session, Throwable cause)
            throws Exception {

        session.close(true);

        // logger.info("会话出现异常  " + session.getRemoteAddress() + " \t"
        // + cause.getMessage());
        // cause.printStackTrace();
        // IoSessionLogger sessionLogger = IoSessionLogger.getLogger(session,
        // logger);
        logger.info(
                "会话出现异常  " + session.getRemoteAddress() + " \t"
                        + cause.getMessage(), cause);

    }

    /**
     * 消息应答事件
     */
    public void messageSent(IoSession session, Object message) throws Exception {
        // TODO Auto-generated method stub
        //logger.info("应答会话  " + session.getRemoteAddress());
        // session.close(true);
    }

    public void inputClosed(IoSession session) throws Exception {

    }


}
